package dslab.monitoring;

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.BindException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import at.ac.tuwien.dsg.orvell.*;
import at.ac.tuwien.dsg.orvell.annotation.Command;
import dslab.ComponentFactory;
import dslab.util.Config;
import dslab.exceptions.MisconfigurationException;

/**
 * feature complete MonitoringServer that implements a shell and a UDP listener for incoming transactions
 */
public class MonitoringServer implements IMonitoringServer {

    // configuration keys that have to be present
    private static final String[] requiredConfigKeys = new String[] {
        "udp.port"
    };

    private final String componentId;
    private final Config config;
    private final InputStream inputStream;
    private final PrintStream printStream;
    private DatagramSocket serverSocket;

    // central data structures storing previously monitored transactions
    private final ConcurrentHashMap<String, Long> addresses;
    private final ConcurrentHashMap<String, Long> servers;

    /**
     * Creates a new server instance.
     *
     * @param componentId the id of the component that corresponds to the Config resource
     * @param config the component config
     * @param in the input stream to read console input from
     * @param out the output stream to write console output to
     */
    public MonitoringServer(String componentId, Config config, InputStream in, PrintStream out) throws MisconfigurationException {
        // print configuration for the benefit of the user
        System.out.printf("@%s: configuration:%n", componentId);
        for (String key : config.listKeys())
            System.out.printf("@%s: { %s: %s }%n", componentId, key, config.getString(key));

        this.componentId = componentId;
        this.config = config;
        this.inputStream = in;
        this.printStream = out;
        this.addresses = new ConcurrentHashMap<>();
        this.servers = new ConcurrentHashMap<>();

        // check if required keys are present
        for (String key : requiredConfigKeys)
            if (!config.containsKey(key))
                throw new MisconfigurationException();
    }

    @Override
    public void run() {
        try {
            // bind server socket to port and wait on connections
            serverSocket = new DatagramSocket(config.getInt("udp.port"));

            // initialize separate shell thread
            Shell shell = new Shell(inputStream, printStream);
            shell.register(this);
            shell.setPrompt(componentId + "> ");
            new Thread(shell).start();

            // compile pattern to match incoming requests
            Pattern pattern = Pattern.compile("([0-9.]+:[0-9]+) (.*)");
            while (true) {
                // bookkeeping
                byte[] buffer = new byte[256];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                // receive packet
                serverSocket.receive(packet);

                // pack byte buffer into string and match for monitoring request syntax
                String message = new String(buffer);
                Matcher matcher = pattern.matcher(message);

                // any matches are logged
                if (matcher.find()) {
                    // use regex capturing groups to extract server and address
                    String server = matcher.group(1).trim();
                    String address = matcher.group(2).trim();


                    Long serverCount = servers.get(server);
                    servers.put(server, 1L + (serverCount == null ? 0L : serverCount));
                    Long addressCount = addresses.get(address);
                    addresses.put(address, 1L + (addressCount == null ? 0L : addressCount));
                }
            }
        } catch (BindException exception) {
            System.err.printf("@%s: a fatal error has occurred: can't bind to port %d: %s%n", componentId, config.getInt("udp.port"), exception.getMessage());
        } catch (SocketException exception) {
            System.err.printf("@%s: monitoring server was closed via shell%n", componentId);
        } catch (IOException exception) {
            System.err.printf("@%s: a fatal io error has occurred%n", componentId);
            exception.printStackTrace();
        } finally {
            // this ugly construct is the price to pay for starting the shell in a different thread
            try {
                shutdown();
            } catch (StopShellException ignored) {}
        }
    }

    @Override
    @Command
    public void addresses() {
        // because this is a read-only group operation, output might either be stale or accurate
        // (missing rows, old values or accurate)
        // this is tolerable
        for (String key : addresses.keySet()) {
            printStream.printf("%s %d%n", key, addresses.get(key));
        }
    }

    @Override
    @Command
    public void servers() {
        // see considerations for addresses()
        for (String key : servers.keySet()) {
            printStream.printf("%s %d%n", key, servers.get(key));
        }
    }

    @Override
    @Command
    public void shutdown() throws StopShellException {
        // attempt closing server
        if (serverSocket != null && !serverSocket.isClosed())
            serverSocket.close();

        // stop shell
        throw new StopShellException();
    }

    public static void main(String[] args) throws Exception {
        try {
            IMonitoringServer server = ComponentFactory.createMonitoringServer(args[0], System.in, System.out);
            server.run();
        } catch (MisconfigurationException exception) {
            System.err.printf("@%s: a fatal error has occurred: configuration is lacking one of the following configuration options: %s%n", args[0], Arrays.toString(requiredConfigKeys));
        }
    }
}
